您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 12161|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;2 P& |4 q9 w# A, W" l
  2. require_once __DIR__ . '/Workerman/Autoloader.php';- J1 O4 \/ h( j0 r' i: H! n, S
  3. . e, ~) F+ h. a& P  K9 t
  4. $worker = new Worker();. I, z4 J7 v! T5 t& n2 q
  5. // 4个进程
    ; ]' Q6 A, l2 [: v. ^
  6. $worker->count = 4;7 H: c3 |, W( P9 Q8 l
  7. // 每个进程启动后在当前进程新增一个Worker监听+ G9 J9 B3 F% W! b9 y
  8. $worker->onWorkerStart = function($worker)$ N6 x& s& F# A; J0 [5 j; Q% @9 [) E! |
  9. {; q  A7 c, P# G- `
  10.     /**% r( @: L( V$ c2 S- Z
  11.      * 4个进程启动的时候都创建2016端口的Worker: L' f) r! P. g7 U& m) _
  12.      * 当执行到worker->listen()时会报Address already in use错误" V& S/ T1 U  i- Y6 N5 D* Z" W1 u: q
  13.      * 如果worker->count=1则不会报错% K3 I1 B  ^$ R/ u
  14.      */
    " u' i2 w- u3 A+ r& w, o8 |
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');' T+ H7 B+ r7 k/ i. |! l
  16.     $inner_worker->onMessage = 'on_message';
    % N* ^+ R8 M! D+ p" m
  17.     // 执行监听。这里会报Address already in use错误7 |7 D/ B& E- O, X# O+ \
  18.     $inner_worker->listen();
    ! R% ?( l& w+ A! ~" ^7 V
  19. };
    " Y6 V* I$ c+ X- K7 H

  20. : O' Z& @; O$ T
  21. $worker->onMessage = 'on_message';4 B& s+ B, O& V1 {+ n
  22. + q; s+ K+ Y5 s( J9 U
  23. function on_message($connection, $data)
    $ w& C3 u! p) C- T# |
  24. {
    ! p6 ^5 }+ l4 U' f8 a7 g3 R
  25.     $connection->send("hello\n");- `; D$ O8 E3 f4 r* m
  26. }& V& R3 A& ?5 _0 D# e

  27. ) X- V7 B" o7 Y  {
  28. // 运行worker
    # |7 g& P3 X* L) D
  29. Worker::runAll();
    : b8 Y: S) B  T6 ]! j& W
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    0 Q3 \3 S5 _4 |8 t% D8 `
  31. 0 b+ p% o. W" w' {! y( H
  32. use Workerman\Worker;& x/ v! o. I. N- z* z- t  q
  33. require_once './Workerman/Autoloader.php';- U$ ~8 _% V; S; {* @4 Q
  34. 8 S0 t9 ~1 M) z6 i: D9 C
  35. $worker = new Worker('text://0.0.0.0:2015');* h0 j' _: U' ^4 p  V' w
  36. // 4个进程
    & [8 M8 o5 ?: r4 X; O" \+ y
  37. $worker->count = 4;' x4 e+ U; ^" t) z4 A
  38. // 每个进程启动后在当前进程新增一个Worker监听" `+ x/ ^2 V4 o  N
  39. $worker->onWorkerStart = function($worker)( {$ g6 l5 v6 R% E( ~  |
  40. {
    0 h* F( x5 H: j
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    ' u0 c2 f; z0 w4 K# J
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)2 W' L, V+ S& ?" c
  43.     $inner_worker->reusePort = true;
    9 Q& V! G, Y: i1 ]) M. Y
  44.     $inner_worker->onMessage = 'on_message';
    ( _" x/ e# {' g$ N) ]. U( b" {0 _
  45.     // 执行监听。正常监听不会报错
    7 {4 B4 E# A2 @& R2 i* [' G
  46.     $inner_worker->listen();' P; h/ \2 G+ h% h' d
  47. };5 ^8 Q+ }' G& h+ y; M
  48. 4 c; z: {3 \# u! R/ g( |% }7 {% S
  49. $worker->onMessage = 'on_message';
    ! |# ?9 l* [0 h! b, Q, \

  50. # E% j- b0 L. j# Z- q
  51. function on_message($connection, $data)' ^5 W/ e& P- }) X. c# F
  52. {3 C& B1 ~1 U; F6 R' f0 K
  53.     $connection->send("hello\n");
    , L+ n# I# b2 Q
  54. }
    . q3 n3 i" F  ^( Z9 c) V
  55.   [  `! w$ o+ S# a8 J8 A% o
  56. // 运行worker
    8 x' r7 x. {/ W
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    & n2 U$ _6 ~6 E( B% ]% P, ]7 a
  2. use Workerman\Worker;0 e5 h. T# _2 ^, h
  3. require_once './Workerman/Autoloader.php';8 X! \! ]8 Q! p
  4. // 初始化一个worker容器,监听1234端口
    6 w+ U7 H; v5 h' j; L
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    ' t; `: X! n' ~7 |5 S3 U0 T
  6. 6 j: n" ^: u( j; }8 F( S/ O8 O
  7. /** C; _, `1 m% P/ F; v
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    + \0 i) [- _" R# r0 G
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    / [3 n  w5 D  h% @: \
  10. */% [. m4 k' s. b1 g8 K4 T4 V# e
  11. $worker->count = 1;
    " D, @( @2 V, W) c! n
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口# K) o2 J% ]4 v( M2 f
  13. $worker->onWorkerStart = function($worker)
    3 T9 U  T, l1 l7 b% A- i; ?  V# e* S
  14. {
    9 [1 O) W' X9 T. _# }8 [# f+ v
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    . \0 C1 t0 @$ |3 }! r0 M
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    $ g* d' t0 O- k8 f/ U1 W  _4 J. o# c
  17.     $inner_text_worker->onMessage = function($connection, $buffer)( g* X9 A8 _$ G
  18.     {
    * |* b, O  S  Z  b: M: A
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    " c" p; d; p" C8 y
  20.         $data = json_decode($buffer, true);! N3 S" D( g4 v
  21.         $uid = $data['uid'];
    % \' F0 o, |3 H
  22.         // 通过workerman,向uid的页面推送数据
    ' P; e4 o/ b: u
  23.         $ret = sendMessageByUid($uid, $buffer);
    1 a( e' Z6 E3 @' N5 [  Q( Z
  24.         // 返回推送结果+ `1 T' G( L$ Q& m" X1 X
  25.         $connection->send($ret ? 'ok' : 'fail');
    ' h7 a: v9 j" m. l: v1 ]1 Q
  26.     };' U" q( }& P2 j( T4 u
  27.     // ## 执行监听 ##
    3 V. v. y7 W0 d9 K# ~
  28.     $inner_text_worker->listen();
    0 u  J# |2 f5 O) Z
  29. };5 v8 G8 B. Y2 ^8 G( s/ p
  30. // 新增加一个属性,用来保存uid到connection的映射
    + o1 q" r4 ]  h/ U" K
  31. $worker->uidConnections = array();
    ( Q! A3 X9 s% K9 s# {( _0 m9 d+ i2 {
  32. // 当有客户端发来消息时执行的回调函数
    ! n. Y& I2 q4 m- i: W7 W6 h
  33. $worker->onMessage = function($connection, $data)" J) p& B6 l4 @- A
  34. {
    $ v( G8 m  E% a- I; p! y1 _1 n( C1 p
  35.     global $worker;
    ( g0 l7 q6 j4 g
  36.     // 判断当前客户端是否已经验证,既是否设置了uid) H: V8 t% j1 C: E( q
  37.     if(!isset($connection->uid))
    $ q3 D2 Q8 `* K' n% R! L9 N( l
  38.     {# J8 }8 O% ~. C; c( @/ r0 @; r0 G
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    5 Q! K; W7 Q8 p" y4 H- U
  40.        $connection->uid = $data;. j8 D( w' w2 _
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,7 B, d9 M9 Y8 n! l
  42.         * 实现针对特定uid推送数据- R( d5 w' ?: V- I! {+ t
  43.         */
    2 c, j/ D2 X9 u8 [
  44.        $worker->uidConnections[$connection->uid] = $connection;( g5 f+ x" z# S) f% l! N8 k2 \' V
  45.        return;
    $ k( p# H: }: d/ K+ O/ D! h
  46.     }
    & j/ r+ X  z" Z/ r
  47. };
    % S! p% I* b7 f! h: l) }7 r

  48. 6 @( a7 X) e% r$ H9 m
  49. // 当有客户端连接断开时; t  W. n1 [( a1 e) I$ i
  50. $worker->onClose = function($connection)3 q2 U" d" B0 s' ~
  51. {
    . d: ?" Y; Q* |0 Q! H( q
  52.     global $worker;
    # o* @+ k1 |" z5 u* Z8 G% _
  53.     if(isset($connection->uid))9 J- R5 ^' Q  ^* E$ h
  54.     {
    - @* }1 v2 Q2 o' {1 G: w7 L
  55.         // 连接断开时删除映射
    ! d1 e' [& i0 W% F. s, s
  56.         unset($worker->uidConnections[$connection->uid]);* D/ Z6 D' f7 ?4 [2 U  p" |" _
  57.     }0 L0 U* L- u6 C9 J
  58. };
    ; y0 q3 D' {  M# M# H: m/ q
  59. 5 P* k/ ^0 q! `+ P2 j
  60. // 向所有验证的用户推送数据9 e( ]4 W% ?( ~
  61. function broadcast($message)! C- x& D* b7 I0 W9 v% i4 {
  62. {
    8 c+ S! K  V0 Z- N1 R' d# t
  63.    global $worker;; w2 r$ _; F( u9 P; e
  64.    foreach($worker->uidConnections as $connection)
    ( E! l" r% I4 z5 I- \$ N9 K, J  Y
  65.    {
    / H- T( l7 l) _7 V5 R( F% q& ~
  66.         $connection->send($message);7 `$ h% w4 _* m/ k+ ]: J
  67.    }
    + `: `" u7 c/ T0 T6 z
  68. }' R6 z9 s) F9 E& c7 I6 o

  69. & S4 i- S7 V5 t! w+ i/ I1 G4 R
  70. // 针对uid推送数据
    # V' g& ?/ ?0 l6 H( Z9 J% q. J
  71. function sendMessageByUid($uid, $message)
    % W/ P. `+ F  W3 U
  72. {9 ]+ K" Z5 w7 m; e: W- t; c5 {1 W$ W
  73.     global $worker;' g, |: U$ }5 {* d# h, s$ g( J
  74.     if(isset($worker->uidConnections[$uid]))- [: W# P. l4 r1 G
  75.     {* V! E9 X; `( r6 ]% U( y
  76.         $connection = $worker->uidConnections[$uid];
    2 a$ R( {9 h' X* ^; v3 v; U
  77.         $connection->send($message);' M( l. O9 K8 b
  78.         return true;
    & R( B6 O3 c: A- x: q" e
  79.     }
    & `1 N: S* y5 X0 E! ]* T
  80.     return false;
    * }, |, v" J) ^
  81. }- K# ]+ Z2 |) y/ p: @& y
  82. ' M9 ?7 k* k+ F$ w
  83. // 运行所有的worker, T4 z! l* E: v. n3 M  v4 x
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');1 ^+ p6 }1 P3 H2 w/ I: e6 e
  2. ws.onopen = function(){- ]" V! w" k6 `# F4 P- m+ ^8 x$ G: w
  3.     var uid = 'uid1';( J$ p* p  ^5 Y9 I( K" a& {9 w
  4.     ws.send(uid);
    , a# U. w( \/ H% F# d: O
  5. };
    1 U4 X( m4 Z* P/ I' l6 D
  6. ws.onmessage = function(e){& O6 o7 C7 H; a$ P% F) {. |1 O+ t
  7.     alert(e.data);7 D; W) i7 ?/ {" M  ?2 f
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    $ p9 s$ R2 b* `  M! U9 [
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    $ I7 S+ `, m/ `3 V7 {7 h: D; H
  3. // 推送的数据,包含uid字段,表示是给这个uid推送% {3 o) L; s( e, p
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');3 I7 ^, h6 h' d) z2 ?) P' _. j( E
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符5 L0 q6 k1 f5 Q- p
  6. fwrite($client, json_encode($data)."\n");
    0 j( M( R4 y+ S
  7. // 读取推送结果$ {$ i+ q6 }4 j7 i( j% P6 T" j7 a
  8. echo fread($client, 8192);
复制代码

9 P3 Z( J5 z4 @( Z: F3 f, ]$ ^* ]9 n  u' V
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-12-22 17:05 , Processed in 0.140859 second(s), 19 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!